作者:歌手王紫璇 | 来源:互联网 | 2024-12-16 10:17
前言:本文旨在深入解析基于RabbitMQ的消息队列技术实现延迟任务的方法,涵盖从基础概念到实际应用的全过程,适用于对消息队列及延迟任务感兴趣的开发人员和技术爱好者。
一、延迟任务概述
延迟任务在现代互联网应用中十分常见,如订单超时自动取消、支付回调重试等。这些场景的特点是在未来某一时刻执行特定操作,并且通常只需执行一次。对于订单超时取消这类任务,由于其幂等性,不必担心重复消费的问题;而对于支付回调重试,则需要特别注意避免重复处理同一笔交易。
1、工作原理
在RabbitMQ中实现延迟任务,生产者将带有延迟属性的消息发送至指定的交换机。该交换机会根据设定的延迟时间将消息转发给相应的队列,消费者则通过监听队列来获取并处理这些消息。值得注意的是,为了保证系统的高可用性和稳定性,必须采取措施确保消息在交换机中的持久化存储,以防因服务中断导致未完成的任务丢失。
2、技术选型
二、系统设计方案
(一)环境配置
为支持延迟消息功能,需在RabbitMQ服务器上安装插件。此插件允许开发者指定消息的延迟时间,从而实现精准的定时任务调度。
(二)生产者实现
生产者的职责是确保消息能够准确无误地发送到RabbitMQ。为此,我们采用confirm确认机制,即消息发送后等待RabbitMQ的确认回复,以验证消息是否成功到达。例如,在创建订单的过程中,首先将订单信息保存到数据库和Redis缓存中,随后发送带有延迟属性的消息至RabbitMQ。一旦接收到确认响应,便从Redis中移除相关记录;若未能成功发送,则重新尝试直至成功。
(三)消费者实现
消费者负责接收并处理来自队列的消息,关键在于确保消息不会丢失且能正确处理。这包括手动确认每条消息的接收状态,保持消费者的长期在线,以及为可能出现的错误提供重试机制。特别是对于那些幂等性的操作,如订单取消,无需额外处理重复消费的情况。
三、Spring Boot实践示例
以下是使用Spring Boot框架实现延迟任务的部分核心代码,完整项目可在GitHub上找到。
(一)生产者代码片段
在处理订单时,首先确保订单数据的安全存储,然后利用RabbitMQ发送带有延迟属性的消息。
for (long i = 1; i <= 10; i++) {
// 模拟生成订单
BuOrder order = createOrder(i);
// 订单入库
orderService.saveOrUpdate(order);
// 将订单存入Redis
RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order);
// 向RabbitMQ异步投递消息
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));
}
生产者通过确认机制确保消息的可靠传递:
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlatiOnData== null) return;
String key = RabbitTemplateConfig.ORDER_PREFIX + correlationData.getId();
if (ack) {
// 消息成功投递,删除Redis中订单数据
RedisUtils.deleteObject(key);
} else {
// 从Redis中读取订单数据,重新投递
BuOrder order = RedisUtils.getObject(key, BuOrder.class);
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));
}
}
(二)消费者代码片段
消费者端通过手动确认消息的消费,确保消息处理的可靠性,并具备自动重试功能。
@RabbitListener(queues = RabbitmqConfig.DELAY_QUEUE_NAME)
public void consumeNode01(Channel channel, Message message, BuOrder order) throws IOException {
if (order.getOrderStatus() == 0) {
// 更新订单状态为已关闭
orderService.updateById(new BuOrder(order.getOrderId(), -1));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info(String.format("消费者节点01消费编号为【%s】的消息", order.getOrderId()));
}
}
建议部署多个消费者实例,以提高消息处理效率,减少队列积压。
(三)辅助工具类
文中提及的RabbitUtils工具类包含了一些与RabbitMQ交互的基础方法,该类位于以下Maven依赖中:
xin.altitude.cms
ucode-cms-common
1.4.3.1